/*
 * Copyright © 2014 Cask Data, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package co.cask.cdap.internal.app.runtime.service.http;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.api.service.http.HttpContentConsumer;
import co.cask.cdap.api.service.http.HttpContentProducer;
import co.cask.cdap.api.service.http.HttpServiceContext;
import co.cask.cdap.api.service.http.HttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceRequest;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.lang.ClassLoaders;
import co.cask.cdap.common.lang.CombineClassLoader;
import co.cask.http.BodyConsumer;
import co.cask.http.BodyProducer;
import co.cask.http.HandlerContext;
import co.cask.http.HttpHandler;
import co.cask.http.HttpResponder;
import co.cask.tephra.TransactionContext;
import co.cask.tephra.TransactionFailureException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.twill.common.Cancellable;
import org.jboss.netty.handler.codec.http.HttpRequest;

/**
 * An abstract base class for all {@link HttpHandler} generated through the {@link HttpHandlerGenerator}.
 *
 * @param <T> Type of the user {@link HttpServiceHandler}.
 */
public abstract class AbstractHttpHandlerDelegator<T extends HttpServiceHandler> implements HttpHandler {

  private final DelegatorContext<T> context;
  private MetricsContext metricsContext;

  protected AbstractHttpHandlerDelegator(DelegatorContext<T> context, MetricsContext metricsContext) {
    this.context = context;
    this.metricsContext = metricsContext;
  }

  @Override
  public void init(HandlerContext context) {
  }

  @Override
  public void destroy(HandlerContext context) {
  }

  /**
   * Returns the {@link HttpServiceHandler} associated with the current thread.
   * This method is called from handler class generated by {@link HttpHandlerGenerator}.
   */
  protected final T getHandler() {
    return context.getHandler();
  }

  /**
   * Returns a {@link TransactionContext} instance to be used for creating transaction.
   * This method is called from handler class generated by {@link HttpHandlerGenerator}.
   */
  @SuppressWarnings("unused")
  protected final TransactionContext getTransactionContext() {
    HttpServiceContext serviceContext = context.getServiceContext();
    Preconditions.checkState(serviceContext instanceof TransactionalHttpServiceContext,
                             "This instance of HttpServiceContext does not support transactions.");
    return ((TransactionalHttpServiceContext) serviceContext).newTransactionContext();
  }

  /**
   * Returns a new instance of {@link HttpServiceRequest} that wraps around the given {@link HttpRequest} object.
   * This method is called from handler class generated by {@link HttpHandlerGenerator}.
   */
  @SuppressWarnings("unused")
  protected final HttpServiceRequest wrapRequest(HttpRequest request) {
    return new DefaultHttpServiceRequest(request);
  }

  /**
   * Returns a new instance of {@link DelayedHttpServiceResponder} that wraps around the given {@link HttpResponder}
   * object. This method is called from handler class generated by {@link HttpHandlerGenerator}.
   */
  @SuppressWarnings("unused")
  protected final DelayedHttpServiceResponder wrapResponder(HttpResponder responder,
                                                            final TransactionContext txContext) {


    MetricsContext collector = this.metricsContext;
    HttpServiceContext serviceContext = context.getServiceContext();
    Preconditions.checkState(serviceContext instanceof TransactionalHttpServiceContext,
                             "This instance of HttpServiceContext does not support transactions.");

    if (serviceContext.getSpecification() != null) {
      collector = metricsContext.childContext(Constants.Metrics.Tag.HANDLER,
                                              serviceContext.getSpecification().getName());
    }

    return new DelayedHttpServiceResponder(responder, new BodyProducerFactory() {
      @Override
      public BodyProducer create(HttpContentProducer contentProducer, TransactionalHttpServiceContext serviceContext) {
        final ClassLoader programContextClassLoader = new CombineClassLoader(
          null, ImmutableList.of(contentProducer.getClass().getClassLoader(), getClass().getClassLoader()));

        Transactional transactional = createTransactional(txContext, programContextClassLoader, serviceContext);

        // Capture the context since we need to keep it till the end of the content producing.
        // We don't need to worry about double capturing of the context when HttpContentConsumer is used.
        // This is because when HttpContentConsumer is used, the responder constructed here will get closed and this
        // BodyProducerFactory won't be used.
        final Cancellable contextReleaser = context.capture();
        return new BodyProducerAdapter(contentProducer, transactional, programContextClassLoader,
                                       serviceContext, contextReleaser);
      }
    }, (TransactionalHttpServiceContext) serviceContext, collector);
  }

  /**
   * Returns a new instance of {@link BodyConsumer} that wraps around the given {@link HttpContentConsumer}
   * and {@link DelayedHttpServiceResponder}.
   *
   * IMPORTANT: This method will also capture the context associated with the current thread, hence after
   * this method is called, no other methods on this class should be called from the current thread.
   *
   * This method is called from handler class generated by {@link HttpHandlerGenerator}.
   */
  @SuppressWarnings("unused")
  protected final BodyConsumer wrapContentConsumer(HttpContentConsumer consumer,
                                                   DelayedHttpServiceResponder responder,
                                                   TransactionContext txContext) {
    Preconditions.checkState(!responder.hasBufferedResponse(),
                             "HttpContentConsumer may not be used after a response has already been sent.");
    // Close the provided responder since a new one will be created for the BodyConsumerAdapter to use.
    responder.close();

    HttpServiceContext serviceContext = context.getServiceContext();
    Preconditions.checkState(serviceContext instanceof TransactionalHttpServiceContext,
                             "This instance of HttpServiceContext does not support transactions.");
    final Cancellable contextReleaser = context.capture();

    final ClassLoader programContextClassLoader = new CombineClassLoader(
      null, ImmutableList.of(consumer.getClass().getClassLoader(), getClass().getClassLoader()));
    final Transactional transactional = createTransactional(txContext, programContextClassLoader, serviceContext);

    return new BodyConsumerAdapter(new DelayedHttpServiceResponder(responder, new BodyProducerFactory() {
      @Override
      public BodyProducer create(HttpContentProducer contentProducer, TransactionalHttpServiceContext serviceContext) {
        // Transfer the captured context from the content consumer to the content producer
        return new BodyProducerAdapter(contentProducer, transactional,
                                       programContextClassLoader, serviceContext, contextReleaser);
      }
    }), consumer, transactional, programContextClassLoader, contextReleaser);
  }


  private Transactional createTransactional(final TransactionContext txContext,
                                            final ClassLoader programContextClassLoader,
                                            final HttpServiceContext serviceContext) {
    return new Transactional() {
      @Override
      public void execute(final TxRunnable runnable) throws TransactionFailureException {
        // This method maybe called from user code, hence the context classloader maybe the
        // program context classloader (or whatever the user set to).
        // We need to switch the classloader back to CDAP system classloader before calling txExecute
        // since it starting of transaction should happens in CDAP system context, not program context
        ClassLoader callerClassLoader = ClassLoaders.setContextClassLoader(getClass().getClassLoader());
        try {
          txContext.start();
          try {
            ClassLoader oldClassLoader = ClassLoaders.setContextClassLoader(programContextClassLoader);
            try {
              runnable.run(serviceContext);
            } finally {
              ClassLoaders.setContextClassLoader(oldClassLoader);
            }
          } catch (Throwable t) {
            txContext.abort(new TransactionFailureException("Exception raised from TxRunnable.run()", t));
          }
          txContext.finish();

        } finally {
          ClassLoaders.setContextClassLoader(callerClassLoader);
        }
      }
    };
  }
}
